集成才是硬道理! 用它构建一个完整的Hadoop
The following article is from 小猴学Java Author 小猴
# 本章主题
Hudi典型应用场景
Hudi对比Kudu、Hive、Hbase
Hudi对Streaming的支持
Hudi核心概念
Hudi核心API
1
应用场景
1
高效数据采集
我们构建Data Lake需要从各种各样的数据源将数据抽取到Hadoop中,例如:一些网站流量日志、关系型数据库、IOT等等。而目前,一个大数据系统中,会采用各种各样的不同采集工具来进行数据采集。比较常见的:使用Flume采集日志数据、使用Sqoop/DataX将RDBMS的数据全量或者增量导入、通过Canel将MySQL中的数据实时摄取、通过Oracle Golden Gate将Oracle的数据实时摄取。但抽取的数据源很多,不得不维护多套不同架构的采集系统。
而Hudi提供的Upserts操作可以更易维护地采集RDBMS、以及向HBase/Cassandra这样的NoSQL或者是Kafka这样的消息队列。
2
ALL IN ONE近实时数据分析
在建立企业级的数据系统时,如果要构建实时的数据集市,可以采用Apache Druid、MemSQL或者是OpenTSDB来提供支持,这种方式对于较小的数据量的查询(例如:系统系统、或者交互式实时分析)是没有问题的。而如果Hadoop上的数据过时了,但供实时数据集市的这部分的过期数据仍然保留着,这些数据很少再被交互式查询。这样会导致资源利用率不足,造成硬件资源的浪费。
在Hadoop上基于Presto和Spark SQL的交互式SQL查询,可以在几秒内完成查询。Apache Hudi,可以在几分钟内完成数据的更新,它可以对多个存储在DFS上的大表进行分析计算。Apache Hudi没有外部依赖(例如:要使用Kylin就得有一套专门用于sub-second的HBase集群),所以使用Apache Hudi可以在不增加额外开销的情况下,对新数据进行更快的查询分析。
3
增量式数据处理管道
传统的数据流处理,大都都是基于DAG的工作流,基于Hadoop上形成一条数据管道。在传统的数据仓库中,一般使用一个新的HDFS文件夹、或者是一个Hive分区来组织数据。例如:通过sqoop,可以以每个小时为单位生成一个Hive分区。并在每个小时结束的时候,将数据同步到HDFS。而下游的任务,等到数据提取完后,继续调度执行。如果下游任务的计算需要1个小时,这一共就是2个小时以上的延迟。
而在一些移动互联网和物联网应用中,数据是不断上传的。而且在传输数据过程中,也会出现数据延迟到达的情况,为了保证数据的正确性,每个小时都需要重跑最后几个小时的数据。为了解决数据的延迟去重跑数据校验结果,会严重影响系统的性能。大家可以想象以下,如果有很多的工作流,每个小时都需要重跑TB级的数据,其影响有多大不言而喻。这也是大部分批处理系统的问题所在:数据的粒度是以分区为单位的。
Hudi提供了以record为粒度(不再是以分区或者文件夹为粒度),来处理上游Hudi表中的新数据。并且可以Update延迟的数据,并且可以支持更高频率的连续调度,例如:15分钟、5分钟。
4
从容应对数据冗余
HDFS有一个经典场景就是把数据文件进行物理切分为block,再将block分布到集群中。接着就可以供应用程序使用了。举一个关于滴滴出行的例子,通过建立一个基于Spark的pipeline,然后从Hadoop存储的日志中找到紧急刹车的事件,然后把这些紧急刹车的事件装载到ElasticSearch中,其他应用通过读取、分析、处理ElasticSearch上的数据,供安全驾驶策略、系统使用。而每天都有大量用户在使用滴滴出行,为了避免写入大量的事件压垮ElasticSearch(服务存储),典型的做法是先写入到消息队列,比较流行地做法就是选择Kafka,然后再消费Kafka中的数据,写入到ElasticSearch。这种做法在业界目前使用很普遍,但是有个问题,Kafka和DFS会冗余大量数据。
而Hudi将每个管道的Spark Pipeline Upsert输出放入到Hudi表中,然后对表进行增量跟踪(用起来就像Kafka主题一样)来获取新的数据,再并入到ElasticSearch(服务存储)中。
2
对比其他经典组件
Apache Hudi填补了在分布式文件系统处理数据的空白,它可以很好地和DFS共存。但在做技术选型时,需要对各种方案有所了解,这样才能权衡利弊,找到适合当前业务场景的技术方案。
1
Apache Kudu
Kudu是一个存储系统,它的设计目标是旨在能够支持Upsert,还能够支持对PB级的数据规模进行实时分析。Kudu好有理想哦,还尝试能够充当OLTP workload的存储。而Hudi并没有尝试这样。
Kudu与HDFS完全不同,Hudi旨在和DFS一起使用(例如:HDFS、S3等),Hudi并没有自己的存储服务器,它借助于DFS存储数据,并借助于Spark来完成计算。所以,Hudi是一套兼容、集成的方案,能够更好的融入到Hadoop生态和Spark生态。而Kudu就不一样了,我们需要自己构建一套Kudu的集群,很多时候要买新的硬件资源。
2
Hive事务
Hive 0.13提供了完整的ACID特性,并支持事务。Hive尝试在基于ORC的文件格式基础上,来实现merge-on-read(后面会讲到)的存储。Hive事务是必须基于Hive的Metastore之上启动的Hive任务或者查询实现的,而Hudi可以充分利用Spark处理框架的所有功能。Hudi作为库嵌入到Spark中,实现和维护起来会容易很多。Hudi的这种设计,是可以在可以完全脱离Hive的计算引擎(例如:Presdo、Spark)使用,将来还会提供除了parquet之外更多的文件格式支持。
3
HBase
尽管HBase是一个为OLTP workload设计的key-value存储系统,因为HBase和Hadoop挨着,所以很多人把HBase和分析关联在一起,HBase对写操作进行了大量优化,可以支持亚秒级更新。而Hive-on-HBase以及Phoenix,让可以用户用SQL地方式查询数据。
但根据在分析上的workload性能表现,混合式的列式存储(例如:Parquet/ORC)要比HBase的StoreFile要好得多,因为分析主要的性能压力都是来自于大量数据的读取,而不是写入。
而Hudi的出现,弥补了这一块缺憾。有了Hudi,可以减少数据与分析性存储格式的差距。HBase并不像Hudi一样,提供了支持提交时间、增量拉取之类的增量处理primitive(原语)。
3
Hudi对流处理的支持
Hudi是可以与目前的批处理、和流处理集成,并将计算结果存储在Hadoop中。对于Spark来说就比较简单些,直接把Hudi的库和Spark/Spark Streaming整合。对于Flink,可以先在Flink进行数据处理,然后将处理后的数据推入到Kafka中或者HDFS中间文件,让导入到Hudi的表中。所以Hudi相当于是一种Source或者Sink。相信不久的将来,Flink-hudi-connector就会到来。
大家可以看一下这个:
https://hudi.apache.org/blog/apache-hudi-meets-apache-flink/
4
Hudi核心概念
Hudi基于HDFS提供了以下两类primitive:
1、Update/Delete
2、Change Streams——数据被Update的事件流
1
Timeline
Hudi会自动维护不同时间对表的所有操作。
这样,我们可以即时查询任意时间的数据。
Hudi instant包含了一下几个组件:
1、Instant action:在表上执行的操作类型
2、Instant time:操作时间(例如:20201220013122),时间戳是单调递增的
3、state:instant的当前状态
Hudi执行的关键操作包含有6个部分:
1、COMMIT:将一批原子写入原子操作提交到表中
2、CLEANS:通过后台启动删除表中不再需要用到的旧文件
3、DELTA_COMMIT:增量提交,将一批记录原子写入到Merge-On-Read类型的表中。
4、COMPACTION:通过后台启动进行数据压缩。例如:将基于行存储的数据转换为列存储格式。在HUDI内部,COMPACTION是timeline上的特殊的COMMIT
5、ROLLBACK:当提交时出现问题进行回滚,删除在写期间产生的文件
6、SAVEPOINT:将一些文件标记为saved,这样后台删除程序就不再会清理他们。在出现故障或者数据恢复时,可以将数据还原到timeline的某个时间点。
Hudi在任意的时间是以下三种状态之一:
1、REQUESTED:已经调度该操作,但还未启动
2、INFLIGHT:表示正在执行操作
3、COMPLETE:表示在timeline上完成了操作
上面这幅图是根据Hudi官网重制的。我们可以看到:
1、看一下Timeline,包含了从10:00到10:20发生的所有更新事件,大约5分钟一次
2、Hudi会在时间轴上保留提交、COMPACTION以及CLEAN的元数据
3、Hudi数据的组织反映了数据的事件时间
Hudi每一次提交都会在Timeline有一项记录,并将数据以不同的时间段划分,但有延迟的数据进来,可以继续更新。所以在数据消费端,总是能够看到最新的数据。延迟的数据通过Upsert将更新的数据生成到之前的时间段的目录中。通过Timeline,可以获取从10:00开始成功提交的所有数据的增量查询。
2
文件管理
Hudi中的表其实就是HDFS指定路径下的目录结构。与Hive非常类似,一个Hudi表可以有多个分区,分区也是以文件夹组织的。每个分区都有相对于基本路径的唯一标识。
在分区中,文件以group方式组织,每个group都有自己的ID。每个group可以包含几个文件切片,每个切片被提交后生成parquet文件。
3
索引
Hudi通过索引机制,提供高效地Upsert操作。索引是基于给定的hoodie key(record的key + 分区路径)映射到文件id。一旦record的第一个版本写入到文件中,这个用于索引的hoodie key就不会再变了。索引其实就是包含一组record的所有版本的映射。
4
表类型与查询类型
Hudi的表类型定义了数据如何在HDFS中进行索引、存储目录以及primitive、和timeline的各类操作(如何写入数据)。而查询类型定义了读取数据。以下是Hudi支持的表类型与查询类型。
1、Copy on write表类型:支持快照查询、增量查询
2、Merge on read表类型:支持快照查询、增量查询、读取优化查询。
核心概念:Copy on write和Merge on read
Copy on write类型的表,仅使用列式文件格式存储数据(当前是parquet文件格式)。在写入数据文件过程中,同时进行数据合并,更新文件版本并重写文件。
Merge on read类型的表,会使用列式(parquet)和行式(avro)文件格式来存储数据。数据的更新会记录到增量文件中,然后以同步或者异步地方式生成新版本的文件。这种方式可以让提交的频率更低。而关键在于Compaction策略。(有点类似HBase),这种方式可以有效避免写放大。
对比这两种不同类型的表:
5
核心概念:查询类型
Snapshot Queries(快照查询)
这种查询方式可以某个commit或者compaction的最新快照。如果是merge-on-read类型的表,会动态地将最新的文件切片和增量文件合并来提供近实时的数据(几分钟)。如果是copy-on-write类型表,可以直接从已经存在的parquet类型的表中查询数据,并同时提供查询从其他side write写入的功能。
Incremental Queries(增量查询)
Hudi提供非常方便的基于时间戳的增量查询。在Hudi中,只有一个提交和Compact已经完成,查询才能看到写入表的新数据。增量查询可以有效地提供change stream。
Read optimized Queries(读优化查询)
这种查询可以查看到提交/压缩后的最新快照。这种方式仅仅读取已经完成的切片数据文件(parquet文件),并保证与列式查询同样性能。
同样,也来对比一下这两类查询:
增量查询到的数据是低延迟的,因为它会将增量日志的数据合并在一起返回。而Read Optimized查询的延迟性较高,因为它不进行合并读。而增量查询查询延迟较高,是因为每次都要合并查询,而Read Optimized查询无需合并,直接将parquet文件查询返回。
5
Hudi核心API(on spark)
了解了Hudi的核心概念之后。我们就可以开心地使用Spark啦。本次,我们来实现Hudi的6个最基础的操作:
1、插入数据
2、查询数据
3、更新数据
4、增量查询数据
5、指定时间查询
6、删除数据
1
项目准备工作
导入Spark依赖和Hudi依赖。本次使用的spark版本为2.4.7、scala版本为2.12、hadoop为2.7.5、hudi版本为0.6。
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<spark-version>2.4.7</spark-version>
<scala-version>2.12</scala-version>
<hadoop-version>2.7.5</hadoop-version>
<hudi-version>0.6.0</hudi-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala-version}</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala-version}</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-bundle_${scala-version}</artifactId>
<version>${hudi-version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.11</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala-version}</artifactId>
<version>${spark-version}</version>
</dependency>
</dependencies>
2
准备订单生成器
为了方便测试,我们来编写一个订单的数据生成器。其中,订单包含以下几个字段:
1、订单ID(使用UUID生成)
2、订单价格
3、订单提交日期
4、订单状态
5、订单商品数量
// 订单实体类
case class Order(id: String, price: BigDecimal, commit_date: Long, status: Int, count: Int, dt:String)
// 生成指定数量的订单
def orderGen(num: Int) = {
val r = new Random()
val sdf = new SimpleDateFormat("yyyy-MM-dd")
(0 to num).map(n => {
val now = new Date()
Order(UUID.randomUUID().toString, BigDecimal.decimal(r.nextFloat() * 1000), now.getTime, r.nextInt(6), r.nextInt(10) + 1, sdf.format(now))
}).toList
}
3
Insert Data
将DataFrame数据导入到Hudi需要留意这及格配置:
1、INSERT/DELETE的并行度配置:
hoodie.insert.shuffle.parallelism
hoodie.upsert.shuffle.parallelism
这里使用的是QuickStart的配置,两个并行度都是2
2、表名,此处指的是元数据中的表名。注意,Hudi并不会为表自动生成目录,需要我们在path中指定表存放的位置。
HoodieWriteConfig.TABLE_NAME
3、表类型,默认为COPY_ON_WRITE
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY
4、记录的key
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY
5、如果有记录出现一样的key,会选择提交预聚合配置的字段值最大的那个
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY
6、配置用于分区的字段(可以指定多个)
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
7、配置是否采用Hive的分区组织方式
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY
注意:调用save的时候,需要指定把表的存放位置放进去。
更多的配置参考:
https://hudi.apache.org/docs/configurations.html#write-options
// 1. 构建Spark运行环境
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 4)
.appName("Load order data")
.getOrCreate()
// 2. 生成10000个订单,并转换为DataFrame
val orderDF = spark.createDataFrame(orderGen(10000))
// 3. 写入到Hudi
// 可以在https://hudi.apache.org/docs/configurations.html中找到详细配置
orderDF.write.format("hudi")
// 配置Hudi insert和update的并行度为2,可以通过hudi客户端传入
.options(getQuickstartWriteConfigs)
// 配置表名【必须】
.option(HoodieWriteConfig.TABLE_NAME, "tbl_order")
// 配置表类型【默认为COPY_ON_WRITE】
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
// 配置record的key【默认uuid】
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
// 配置预聚合的字段【默认ts】
// 如果两个record的key相同时,会取该字段指定的最大的那个
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "commit_date")
// 指定分区字段【默认partitionpaht】
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
// 指定是否采用类似Hive的分区组织方式【默认:false】
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.mode(SaveMode.Append)
.save("/hudi/tbl_order")
4
Query Data
Hudi查询数据非常方便,Spark直接read就可以了。只需要指定从什么位置读取数据即可。因为之前创建的表只有一个层级的分区,所以读取的时候使用的是/hudi/*,如果是多级分区可以使用/hudi/*/*…
// 1. 构建Spark运行环境
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 4)
.appName("Load order data")
.getOrCreate()
spark.read.format("hudi")
.load("/hudi/tbl_order/*")
.createOrReplaceTempView("tbl_order");
// 查看tbl_order的数据
spark.sql("select * from tbl_order").show()
spark.sql("select status, count(id) from tbl_order where dt = '2021-01-02' group by status").show()
执行结果如下:
查询2021-01-02的数据并按照状态进行分组聚合
+------+---------+
| 1| 168464|
| 3| 168263|
| 5| 168218|
| 4| 168764|
| 2| 167997|
| 0| 168296|
+------+---------+
5
Update Data
更新数据和插入数据非常像。因为默认OPERATION_OPT_KEY配置的是:UPSERT,是支持插入和更新的。我们只需要将需要更新的数据直接写入到Hudi即可,但必须要将record key带上。
// 1. 构建Spark运行环境
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 4)
.appName("Load order data")
.getOrCreate()
// 2. 获取所有状态为0的数据,并将状态更新为1
spark.read.format("hudi")
.load("/hudi/tbl_order/*")
.createOrReplaceTempView("tbl_order")
val updateDF = spark.sql("select id, price, commit_date, 1 as status, count, dt from tbl_order where status = 0")
// 3. 写入到Hudi
// 可以在https://hudi.apache.org/docs/configurations.html中找到详细配置
updateDF.write.format("hudi")
// 配置Hudi insert和update的并行度为2,可以通过hudi客户端传入
.options(getQuickstartWriteConfigs)
// 配置表名【必须】
.option(HoodieWriteConfig.TABLE_NAME, "tbl_order")
// 配置表类型【默认为COPY_ON_WRITE】
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
// 配置record的key【默认uuid】
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
// 配置预聚合的字段【默认ts】
// 如果两个record的key相同时,会取该字段指定的最大的那个
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "commit_date")
// 指定分区字段【默认partitionpaht】
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
// 指定是否采用类似Hive的分区组织方式【默认:false】
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.mode(SaveMode.Append)
.save("/hudi/tbl_order")
5
Incremental Query Data
增量查询需要指定一个读取参数:BEGIN_INSTANTTIME_OPT_KEY,通过该配置我们可以查询出来某个时间点数据写入Hudi的数据。
// 1. 构建Spark运行环境
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 4)
.appName("Load order data")
.getOrCreate()
spark.read.format("hudi")
// 指定进行增量查询
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
// 指定查询从2021年1月2号18点之后提交的数据
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20210102180000")
.load("/hudi/tbl_order/*")
.createOrReplaceTempView("tbl_order_incremental");
spark.sql("select * from tbl_order_incremental").show()
spark.sql("select count(*) from tbl_order_incremental").show()
6
Point in time Query Data
指定时间范围查询和incremental query类似,只需要指定一个END_INSTANTTIME_OPT_KEY即可。
spark.read.format("hudi")
// 指定进行增量查询
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
// 指定查询从2021年1月2号18点之后提交的数据
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20210102183100")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, "20210102183126")
.load("/hudi/tbl_order/*")
.createOrReplaceTempView("tbl_order_incremental");
spark.sql("select * from tbl_order_incremental").show()
spark.sql("select count(*) from tbl_order_incremental").show()
7
Delete Data
Hudi支持两种方式来删除数据,一种是软删除、另一种是硬删除(物理删除)。
软删除的方式就是保留record的key,将其他字段设置为null。
硬删除就是物理删除,将record的数据从表中删除。有三种方式:
设置OPERATION_OPT_KEY 为DELETE_OPERATION_OPT_VAL
设置PAYLOAD_CLASS_OPT_KEY
org.apache.hudi.EmptyHoodieRecordPayload。此操作将会在DataSet提交时移除所有的数据。
在DataSource中添加名为_hoodie_is_deleted列到DataSet中,将要删除的record这一列设置为true。对于要重新upsert的数据,把这列设置为false或者null即可。
删除数据只需要将Hoodie record的key传入即可。操作方式与INSERT/UPDATE类似。只是需要将OPERATION_OPT_KEY参数设置为delete即可。
// 1. 构建Spark运行环境
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 4)
.appName("Load order data")
.getOrCreate()
val hudiDF = spark.read.format("hudi")
.load("/hudi/tbl_order/*")
hudiDF.createOrReplaceTempView("tbl_order")
spark.sql("select count(id) from tbl_order").show(100)
val deleteAllDF = spark.sql("select id from tbl_order")
// 3. 从Hudi中删除所有数据
// 可以在https://hudi.apache.org/docs/configurations.html中找到详细配置
deleteAllDF.write.format("hudi")
// 配置Hudi insert和update的并行度为2,可以通过hudi客户端传入
.options(getQuickstartWriteConfigs)
// 配置删除【默认为upsert】
// DELETE_OPERATION_OPT_VAL为软删除
// DELETE_OPERATION_OPT_VAL为软删除
// .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
// 配置表名【必须】
.option(HoodieWriteConfig.TABLE_NAME, "tbl_order")
// 配置record的key【默认uuid】
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
// 配置预聚合的字段【默认ts】
// 如果两个record的key相同时,会取该字段指定的最大的那个
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "commit_date")
// 指定分区字段【默认partitionpaht】
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
// 指定是否采用类似Hive的分区组织方式【默认:false】
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.mode(SaveMode.Append)
.save("/hudi/tbl_orde")
spark.sql("select * from tbl_order").show()
# 总结
Hudi通过集成Hadoop、Spark,而且操作方式非常简洁,未来可期,一定会受到越来越多人的关注。能够基于现有的一套解决方案,为什么要引入更多的复杂、资源、架构呢?
未来可期,Hudi!
以上
参考文献:
https://hudi.apache.org/docs/use_cases.html
https://hudi.apache.org/docs/concepts.html#copy-on-write-table
https://hudi.apache.org/docs/quick-start-guide.html
推荐阅读
重磅!华为云湖仓一体FusionInsight集成Apache Hudi
Lakehouse: 统一数据仓库和高级分析的新一代开放平台